diff --git a/client/rpc/dag.go b/client/rpc/dag.go index 63cac8f61ac..ee059429435 100644 --- a/client/rpc/dag.go +++ b/client/rpc/dag.go @@ -3,13 +3,16 @@ package rpc import ( "bytes" "context" + "encoding/json" "fmt" "io" + "github.com/ipfs/boxo/files" "github.com/ipfs/boxo/path" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" + iface "github.com/ipfs/kubo/core/coreiface" "github.com/ipfs/kubo/core/coreiface/options" multicodec "github.com/multiformats/go-multicodec" ) @@ -129,6 +132,72 @@ func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error { return nil } +func (api *HttpDagServ) Import(ctx context.Context, file files.File, opts ...options.DagImportOption) (<-chan iface.DagImportResult, error) { + options, err := options.DagImportOptions(opts...) + if err != nil { + return nil, err + } + + req := api.core().Request("dag/import") + + if options.PinRootsSet { + req.Option("pin-roots", options.PinRoots) + } + + if options.StatsSet { + req.Option("stats", options.Stats) + } + + if options.FastProvideRootSet { + req.Option("fast-provide-root", options.FastProvideRoot) + } + + if options.FastProvideWaitSet { + req.Option("fast-provide-wait", options.FastProvideWait) + } + + req.Body(files.NewMultiFileReader(files.NewMapDirectory(map[string]files.Node{"": file}), false, false)) + + resp, err := req.Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + out := make(chan iface.DagImportResult) + + go func() { + defer resp.Close() + defer close(out) + + dec := json.NewDecoder(resp.Output) + + for { + var event iface.DagImportResult + + if err := dec.Decode(&event); err != nil { + if err != io.EOF { + select { + case out <- iface.DagImportResult{Err: err}: + case <-ctx.Done(): + } + } + return + } + + select { + case out <- event: + case <-ctx.Done(): + return + } + } + }() + + return out, nil +} + func (api *httpNodeAdder) core() *HttpApi { return (*HttpApi)(api) } diff --git a/client/rpc/dag_test.go b/client/rpc/dag_test.go new file mode 100644 index 00000000000..e0d69d7c1cc --- /dev/null +++ b/client/rpc/dag_test.go @@ -0,0 +1,176 @@ +package rpc + +import ( + "context" + "os" + "testing" + + "github.com/ipfs/boxo/files" + "github.com/ipfs/go-cid" + "github.com/ipfs/kubo/core/coreiface/options" + "github.com/ipfs/kubo/test/cli/harness" + "github.com/stretchr/testify/require" +) + +func TestDagImport_Basic(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init().StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + // Open test fixture + carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car") + require.NoError(t, err) + defer carFile.Close() + + // Import CAR file + results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile)) + require.NoError(t, err) + + // Collect results + var roots []cid.Cid + for result := range results { + if result.Root != nil { + roots = append(roots, result.Root.Cid) + require.Empty(t, result.Root.PinErrorMsg, "pin should succeed") + } + } + + // Verify we got exactly one root + require.Len(t, roots, 1, "should have exactly one root") + + // Verify the expected root CID + expectedRoot := "bafyreifrm6uf5o4dsaacuszf35zhibyojlqclabzrms7iak67pf62jygaq" + require.Equal(t, expectedRoot, roots[0].String()) +} + +func TestDagImport_WithStats(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init().StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car") + require.NoError(t, err) + defer carFile.Close() + + // Import with stats enabled + results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile), + options.Dag.Stats(true)) + require.NoError(t, err) + + var roots []cid.Cid + var gotStats bool + var blockCount uint64 + + for result := range results { + if result.Root != nil { + roots = append(roots, result.Root.Cid) + } + if result.Stats != nil { + gotStats = true + blockCount = result.Stats.BlockCount + } + } + + require.Len(t, roots, 1, "should have one root") + require.True(t, gotStats, "should receive stats") + require.Equal(t, uint64(4), blockCount, "TestDagStat.car has 4 blocks") +} + +func TestDagImport_OfflineWithFastProvide(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init().StartDaemon("--offline=true") + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car") + require.NoError(t, err) + defer carFile.Close() + + // Import with fast-provide enabled in offline mode + // Should succeed gracefully (fast-provide silently skipped) + results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile), + options.Dag.FastProvideRoot(true), + options.Dag.FastProvideWait(true)) + require.NoError(t, err) + + var roots []cid.Cid + for result := range results { + if result.Root != nil { + roots = append(roots, result.Root.Cid) + } + } + + require.Len(t, roots, 1, "import should succeed offline with fast-provide enabled") +} + +func TestDagImport_OnlineWithFastProvideWait(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init().StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car") + require.NoError(t, err) + defer carFile.Close() + + // Import with fast-provide wait enabled in online mode. + // This tests that FastProvideWait actually blocks (not fire-and-forget). + // In isolated test environment with no DHT peers, the blocking provide + // operation should fail and propagate an error. + results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile), + options.Dag.FastProvideRoot(true), + options.Dag.FastProvideWait(true)) + + // Initial call may succeed, but we should get error from results channel + if err == nil { + // Consume results until we hit the expected error + var gotError bool + for result := range results { + if result.Err != nil { + gotError = true + require.Contains(t, result.Err.Error(), "fast-provide", + "error should be from fast-provide operation") + break + } + } + require.True(t, gotError, "should receive fast-provide error in isolated test environment") + } else { + // Error returned directly (also acceptable) + require.Contains(t, err.Error(), "fast-provide", + "error should be from fast-provide operation") + } +} diff --git a/client/rpc/routing.go b/client/rpc/routing.go index 693f155c6b0..54e3f84b9a5 100644 --- a/client/rpc/routing.go +++ b/client/rpc/routing.go @@ -7,6 +7,7 @@ import ( "encoding/json" "github.com/ipfs/boxo/path" + iface "github.com/ipfs/kubo/core/coreiface" "github.com/ipfs/kubo/core/coreiface/options" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" @@ -156,6 +157,31 @@ func (api *RoutingAPI) Provide(ctx context.Context, p path.Path, opts ...options Exec(ctx, nil) } +func (api *RoutingAPI) ProvideStats(ctx context.Context, opts ...options.RoutingProvideStatOption) (*iface.ProvideStatsResponse, error) { + options, err := options.RoutingProvideStatOptions(opts...) + if err != nil { + return nil, err + } + + resp, err := api.core().Request("provide/stat"). + Option("lan", options.UseLAN). + Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + defer resp.Close() + + var out iface.ProvideStatsResponse + if err := json.NewDecoder(resp.Output).Decode(&out); err != nil { + return nil, err + } + + return &out, nil +} + func (api *RoutingAPI) core() *HttpApi { return (*HttpApi)(api) } diff --git a/client/rpc/routing_test.go b/client/rpc/routing_test.go new file mode 100644 index 00000000000..39299291286 --- /dev/null +++ b/client/rpc/routing_test.go @@ -0,0 +1,177 @@ +package rpc + +import ( + "context" + "encoding/json" + "testing" + + boxoprovider "github.com/ipfs/boxo/provider" + iface "github.com/ipfs/kubo/core/coreiface" + "github.com/ipfs/kubo/core/coreiface/options" + "github.com/ipfs/kubo/test/cli/harness" + "github.com/libp2p/go-libp2p-kad-dht/provider/stats" + "github.com/stretchr/testify/require" +) + +func TestProvideStats_JSONCompatibility(t *testing.T) { + // Verify that command's provideStats structure is compatible with + // iface.ProvideStatsResponse for JSON marshaling/unmarshaling. + // This ensures RPC client can correctly decode responses from the command. + + // Create instance of command's provideStats structure + cmdStats := struct { + Sweep *stats.Stats `json:"Sweep,omitempty"` + Legacy *boxoprovider.ReproviderStats `json:"Legacy,omitempty"` + FullRT bool `json:"FullRT,omitempty"` + }{ + Sweep: &stats.Stats{}, + FullRT: true, + } + + // Marshal command structure to JSON + data, err := json.Marshal(cmdStats) + require.NoError(t, err, "should marshal command stats") + + // Unmarshal into interface type + var ifaceStats iface.ProvideStatsResponse + err = json.Unmarshal(data, &ifaceStats) + require.NoError(t, err, "should unmarshal into interface stats") + + // Verify fields transferred correctly + require.NotNil(t, ifaceStats.Sweep, "Sweep field should be present") + require.Nil(t, ifaceStats.Legacy, "Legacy field should be nil") + require.True(t, ifaceStats.FullRT, "FullRT field should be true") +} + +// testProvideStats mirrors the subset of fields we verify in tests. +// Intentionally independent from coreiface types to detect breaking changes. +type testProvideStats struct { + Sweep *struct { + Connectivity struct { + Status string `json:"status"` + } `json:"connectivity"` + Queues struct { + PendingKeyProvides int `json:"pending_key_provides"` + } `json:"queues"` + Schedule struct { + Keys int `json:"keys"` + } `json:"schedule"` + } `json:"Sweep,omitempty"` + Legacy *struct { + TotalReprovides uint64 `json:"TotalReprovides"` + } `json:"Legacy,omitempty"` +} + +func TestProvideStats_WithSweepProvider(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init() + + // Explicitly enable Sweep provider (default in v0.39) + node.SetIPFSConfig("Provide.DHT.SweepEnabled", true) + node.SetIPFSConfig("Provide.Enabled", true) + + node.StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + // Get provide stats + result, err := api.Routing().ProvideStats(ctx) + require.NoError(t, err) + require.NotNil(t, result) + + // Verify Sweep stats are present, Legacy is not + require.NotNil(t, result.Sweep, "Sweep provider should return Sweep stats") + require.Nil(t, result.Legacy, "Sweep provider should not return Legacy stats") + + // Marshal to JSON and unmarshal to test struct to verify structure + data, err := json.Marshal(result) + require.NoError(t, err) + + var testStats testProvideStats + err = json.Unmarshal(data, &testStats) + require.NoError(t, err) + + // Verify key fields exist and have reasonable values + require.NotNil(t, testStats.Sweep) + require.NotEmpty(t, testStats.Sweep.Connectivity.Status, "connectivity status should be present") + require.GreaterOrEqual(t, testStats.Sweep.Queues.PendingKeyProvides, 0, "queue size should be non-negative") + require.GreaterOrEqual(t, testStats.Sweep.Schedule.Keys, 0, "scheduled keys should be non-negative") +} + +func TestProvideStats_WithLegacyProvider(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init() + + // Explicitly disable Sweep to use Legacy provider + node.SetIPFSConfig("Provide.DHT.SweepEnabled", false) + node.SetIPFSConfig("Provide.Enabled", true) + + node.StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + // Get provide stats + result, err := api.Routing().ProvideStats(ctx) + require.NoError(t, err) + require.NotNil(t, result) + + // Verify Legacy stats are present, Sweep is not + require.Nil(t, result.Sweep, "Legacy provider should not return Sweep stats") + require.NotNil(t, result.Legacy, "Legacy provider should return Legacy stats") + + // Marshal to JSON and unmarshal to test struct to verify structure + data, err := json.Marshal(result) + require.NoError(t, err) + + var testStats testProvideStats + err = json.Unmarshal(data, &testStats) + require.NoError(t, err) + + // Verify Legacy field exists + require.NotNil(t, testStats.Legacy) + require.GreaterOrEqual(t, testStats.Legacy.TotalReprovides, uint64(0), "total reprovides should be non-negative") +} + +func TestProvideStats_LANFlagErrorWithLegacy(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init() + + // Use Legacy provider - LAN flag should error + node.SetIPFSConfig("Provide.DHT.SweepEnabled", false) + node.SetIPFSConfig("Provide.Enabled", true) + + node.StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + // Try to get LAN stats with Legacy provider + // This should return an error + _, err = api.Routing().ProvideStats(ctx, options.Routing.UseLAN(true)) + require.Error(t, err, "LAN flag should error with Legacy provider") + require.Contains(t, err.Error(), "LAN stats only available for Sweep provider with Dual DHT", + "error should indicate LAN stats unavailable") +} diff --git a/client/rpc/unixfs.go b/client/rpc/unixfs.go index 316cc21a8ec..68586472b8d 100644 --- a/client/rpc/unixfs.go +++ b/client/rpc/unixfs.go @@ -56,6 +56,14 @@ func (api *UnixfsAPI) Add(ctx context.Context, f files.Node, opts ...caopts.Unix req.Option("raw-leaves", options.RawLeaves) } + if options.FastProvideRootSet { + req.Option("fast-provide-root", options.FastProvideRoot) + } + + if options.FastProvideWaitSet { + req.Option("fast-provide-wait", options.FastProvideWait) + } + switch options.Layout { case caopts.BalancedLayout: // noop, default diff --git a/core/commands/dag/import.go b/core/commands/dag/import.go index 032b9e52a6c..3778c60ba9d 100644 --- a/core/commands/dag/import.go +++ b/core/commands/dag/import.go @@ -2,26 +2,14 @@ package dagcmd import ( "errors" - "fmt" - "io" "github.com/ipfs/boxo/files" - blocks "github.com/ipfs/go-block-format" - cid "github.com/ipfs/go-cid" cmds "github.com/ipfs/go-ipfs-cmds" - ipld "github.com/ipfs/go-ipld-format" - ipldlegacy "github.com/ipfs/go-ipld-legacy" - logging "github.com/ipfs/go-log/v2" "github.com/ipfs/kubo/config" - "github.com/ipfs/kubo/core/coreiface/options" - gocarv2 "github.com/ipld/go-car/v2" - "github.com/ipfs/kubo/core/commands/cmdenv" - "github.com/ipfs/kubo/core/commands/cmdutils" + "github.com/ipfs/kubo/core/coreiface/options" ) -var log = logging.Logger("core/commands") - func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { node, err := cmdenv.GetNode(env) if err != nil { @@ -38,62 +26,31 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment return err } - blockDecoder := ipldlegacy.NewDecoder() - - // on import ensure we do not reach out to the network for any reason - // if a pin based on what is imported + what is in the blockstore - // isn't possible: tough luck + // Ensure offline mode - import should not reach out to network api, err = api.WithOptions(options.Api.Offline(true)) if err != nil { return err } + // Parse options doPinRoots, _ := req.Options[pinRootsOptionName].(bool) - + doStats, _ := req.Options[statsOptionName].(bool) fastProvideRoot, fastProvideRootSet := req.Options[fastProvideRootOptionName].(bool) fastProvideWait, fastProvideWaitSet := req.Options[fastProvideWaitOptionName].(bool) + // Resolve fast-provide options from config if not explicitly set fastProvideRoot = config.ResolveBoolFromConfig(fastProvideRoot, fastProvideRootSet, cfg.Import.FastProvideRoot, config.DefaultFastProvideRoot) fastProvideWait = config.ResolveBoolFromConfig(fastProvideWait, fastProvideWaitSet, cfg.Import.FastProvideWait, config.DefaultFastProvideWait) - // grab a pinlock ( which doubles as a GC lock ) so that regardless of the - // size of the streamed-in cars nothing will disappear on us before we had - // a chance to roots that may show up at the very end - // This is especially important for use cases like dagger: - // ipfs dag import $( ... | ipfs-dagger --stdout=carfifos ) - // - if doPinRoots { - unlocker := node.Blockstore.PinLock(req.Context) - defer unlocker.Unlock(req.Context) - } - - // this is *not* a transaction - // it is simply a way to relieve pressure on the blockstore - // similar to pinner.Pin/pinner.Flush - batch := ipld.NewBatch(req.Context, api.Dag(), - // Default: 128. Means 128 file descriptors needed in flatfs - ipld.MaxNodesBatchOption(int(cfg.Import.BatchMaxNodes.WithDefault(config.DefaultBatchMaxNodes))), - // Default 100MiB. When setting block size to 1MiB, we can add - // ~100 nodes maximum. With default 256KiB block-size, we will - // hit the max nodes limit at 32MiB.p - ipld.MaxSizeBatchOption(int(cfg.Import.BatchMaxSize.WithDefault(config.DefaultBatchMaxSize))), - ) - - roots := cid.NewSet() - var blockCount, blockBytesCount uint64 - - // remember last valid block and provide a meaningful error message - // when a truncated/mangled CAR is being imported - importError := func(previous blocks.Block, current blocks.Block, err error) error { - if current != nil { - return fmt.Errorf("import failed at block %q: %w", current.Cid(), err) - } - if previous != nil { - return fmt.Errorf("import failed after block %q: %w", previous.Cid(), err) - } - return fmt.Errorf("import failed: %w", err) + // Build CoreAPI options + dagOpts := []options.DagImportOption{ + options.Dag.PinRoots(doPinRoots), + options.Dag.Stats(doStats), + options.Dag.FastProvideRoot(fastProvideRoot), + options.Dag.FastProvideWait(fastProvideWait), } + // Process each file it := req.Files.Entries() for it.Next() { file := files.FileFromEntry(it) @@ -101,118 +58,44 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment return errors.New("expected a file handle") } - // import blocks - err = func() error { - // wrap a defer-closer-scope - // - // every single file in it() is already open before we start - // just close here sooner rather than later for neatness - // and to surface potential errors writing on closed fifos - // this won't/can't help with not running out of handles - defer file.Close() - - var previous blocks.Block - - car, err := gocarv2.NewBlockReader(file) - if err != nil { - return err - } + // Call CoreAPI to import the file + resultChan, err := api.Dag().Import(req.Context, file, dagOpts...) + if err != nil { + return err + } - for _, c := range car.Roots { - roots.Add(c) + // Stream results back to user + for result := range resultChan { + // Check for errors from CoreAPI + if result.Err != nil { + return result.Err } - for { - block, err := car.Next() - if err != nil && err != io.EOF { - return importError(previous, block, err) - } else if block == nil { - break - } - if err := cmdutils.CheckBlockSize(req, uint64(len(block.RawData()))); err != nil { - return importError(previous, block, err) - } - - // the double-decode is suboptimal, but we need it for batching - nd, err := blockDecoder.DecodeNode(req.Context, block) + // Emit root results + if result.Root != nil { + err := res.Emit(&CarImportOutput{ + Root: &RootMeta{ + Cid: result.Root.Cid, + PinErrorMsg: result.Root.PinErrorMsg, + }, + }) if err != nil { - return importError(previous, block, err) + return err } - - if err := batch.Add(req.Context, nd); err != nil { - return importError(previous, block, err) - } - blockCount++ - blockBytesCount += uint64(len(block.RawData())) - previous = block } - return nil - }() - if err != nil { - return err - } - } - - if err := batch.Commit(); err != nil { - return err - } - // It is not guaranteed that a root in a header is actually present in the same ( or any ) - // .car file. This is the case in version 1, and ideally in further versions too. - // Accumulate any root CID seen in a header, and supplement its actual node if/when encountered - // We will attempt a pin *only* at the end in case all car files were well-formed. - - // opportunistic pinning: try whatever sticks - if doPinRoots { - err = roots.ForEach(func(c cid.Cid) error { - ret := RootMeta{Cid: c} - - // This will trigger a full read of the DAG in the pinner, to make sure we have all blocks. - // Ideally we would do colloring of the pinning state while importing the blocks - // and ensure the gray bucket is empty at the end (or use the network to download missing blocks). - if block, err := node.Blockstore.Get(req.Context, c); err != nil { - ret.PinErrorMsg = err.Error() - } else if nd, err := blockDecoder.DecodeNode(req.Context, block); err != nil { - ret.PinErrorMsg = err.Error() - } else if err := node.Pinning.Pin(req.Context, nd, true, ""); err != nil { - ret.PinErrorMsg = err.Error() - } else if err := node.Pinning.Flush(req.Context); err != nil { - ret.PinErrorMsg = err.Error() + // Emit stats results + if result.Stats != nil { + err := res.Emit(&CarImportOutput{ + Stats: &CarImportStats{ + BlockCount: result.Stats.BlockCount, + BlockBytesCount: result.Stats.BlockBytesCount, + }, + }) + if err != nil { + return err + } } - - return res.Emit(&CarImportOutput{Root: &ret}) - }) - if err != nil { - return err - } - } - - stats, _ := req.Options[statsOptionName].(bool) - if stats { - err = res.Emit(&CarImportOutput{ - Stats: &CarImportStats{ - BlockCount: blockCount, - BlockBytesCount: blockBytesCount, - }, - }) - if err != nil { - return err - } - } - - // Fast-provide roots for faster discovery - if fastProvideRoot { - err = roots.ForEach(func(c cid.Cid) error { - return cmdenv.ExecuteFastProvide(req.Context, node, cfg, c, fastProvideWait, doPinRoots, doPinRoots, false) - }) - if err != nil { - return err - } - } else { - if fastProvideWait { - log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config", "wait-flag-ignored", true) - } else { - log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config") } } diff --git a/core/commands/provide.go b/core/commands/provide.go index c9d3954cfe9..0b756b27b36 100644 --- a/core/commands/provide.go +++ b/core/commands/provide.go @@ -15,10 +15,8 @@ import ( cid "github.com/ipfs/go-cid" cmds "github.com/ipfs/go-ipfs-cmds" "github.com/ipfs/kubo/core/commands/cmdenv" + "github.com/ipfs/kubo/core/coreiface/options" "github.com/libp2p/go-libp2p-kad-dht/fullrt" - "github.com/libp2p/go-libp2p-kad-dht/provider" - "github.com/libp2p/go-libp2p-kad-dht/provider/buffered" - "github.com/libp2p/go-libp2p-kad-dht/provider/dual" "github.com/libp2p/go-libp2p-kad-dht/provider/stats" routing "github.com/libp2p/go-libp2p/core/routing" "github.com/probe-lab/go-libdht/kad/key" @@ -136,26 +134,6 @@ type provideStats struct { FullRT bool // only used for legacy stats } -// extractSweepingProvider extracts a SweepingProvider from the given provider interface. -// It handles unwrapping buffered and dual providers, selecting LAN or WAN as specified. -// Returns nil if the provider is not a sweeping provider type. -func extractSweepingProvider(prov any, useLAN bool) *provider.SweepingProvider { - switch p := prov.(type) { - case *provider.SweepingProvider: - return p - case *dual.SweepingProvider: - if useLAN { - return p.LAN - } - return p.WAN - case *buffered.SweepingProvider: - // Recursively extract from the inner provider - return extractSweepingProvider(p.Provider, useLAN) - default: - return nil - } -} - var provideStatCmd = &cmds.Command{ Status: cmds.Experimental, Helptext: cmds.HelpText{ @@ -234,41 +212,36 @@ NOTES: cmds.BoolOption(provideStatQueuesOptionName, "Display provide and reprovide queue sizes"), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - nd, err := cmdenv.GetNode(env) + api, err := cmdenv.GetApi(env, req) if err != nil { return err } - if !nd.IsOnline { - return ErrNotOnline - } - lanStats, _ := req.Options[provideLanOptionName].(bool) - // Handle legacy provider - if legacySys, ok := nd.Provider.(boxoprovider.System); ok { - if lanStats { - return errors.New("LAN stats only available for Sweep provider with Dual DHT") - } - stats, err := legacySys.Stat() - if err != nil { - return err - } - _, fullRT := nd.DHTClient.(*fullrt.FullRT) - return res.Emit(provideStats{Legacy: &stats, FullRT: fullRT}) + // Get stats from CoreAPI + opts := []options.RoutingProvideStatOption{} + if lanStats { + opts = append(opts, options.Routing.UseLAN(true)) } - // Extract sweeping provider (handles buffered and dual unwrapping) - sweepingProvider := extractSweepingProvider(nd.Provider, lanStats) - if sweepingProvider == nil { - if lanStats { - return errors.New("LAN stats only available for Sweep provider with Dual DHT") - } - return fmt.Errorf("stats not available with current routing system %T", nd.Provider) + result, err := api.Routing().ProvideStats(req.Context, opts...) + if err != nil { + return err + } + + // Set FullRT field for display (command-layer presentation concern) + nd, err := cmdenv.GetNode(env) + if err != nil { + return err } + _, fullRT := nd.DHTClient.(*fullrt.FullRT) - s := sweepingProvider.Stats() - return res.Emit(provideStats{Sweep: &s}) + return res.Emit(provideStats{ + Sweep: result.Sweep, + Legacy: result.Legacy, + FullRT: fullRT, + }) }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, s provideStats) error { diff --git a/core/coreapi/dag.go b/core/coreapi/dag.go index 70686f62e03..9a0ffb600eb 100644 --- a/core/coreapi/dag.go +++ b/core/coreapi/dag.go @@ -2,11 +2,20 @@ package coreapi import ( "context" + "fmt" + "io" + "github.com/ipfs/boxo/files" dag "github.com/ipfs/boxo/ipld/merkledag" pin "github.com/ipfs/boxo/pinning/pinner" + blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" + ipldlegacy "github.com/ipfs/go-ipld-legacy" + "github.com/ipfs/kubo/config" + iface "github.com/ipfs/kubo/core/coreiface" + "github.com/ipfs/kubo/core/coreiface/options" + gocarv2 "github.com/ipld/go-car/v2" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -68,6 +77,256 @@ func (api *dagAPI) Session(ctx context.Context) ipld.NodeGetter { return dag.NewSession(ctx, api.DAGService) } +func (api *dagAPI) Import(ctx context.Context, file files.File, opts ...options.DagImportOption) (<-chan iface.DagImportResult, error) { + // Parse options + settings, err := options.DagImportOptions(opts...) + if err != nil { + return nil, err + } + + // Get config for batch settings + cfg, err := api.core.repo.Config() + if err != nil { + return nil, err + } + + // Create block decoder for IPLD nodes + blockDecoder := ipldlegacy.NewDecoder() + + // Create batch for efficient block addition. + // Uses config values for batch size tuning: + // - MaxNodes: Default 128 nodes per batch (128 file descriptors in flatfs) + // - MaxSize: Default 100MiB per batch (with 256KiB blocks, hits node limit at ~32MiB) + batch := ipld.NewBatch(ctx, api.DAGService, + ipld.MaxNodesBatchOption(int(cfg.Import.BatchMaxNodes.WithDefault(config.DefaultBatchMaxNodes))), + ipld.MaxSizeBatchOption(int(cfg.Import.BatchMaxSize.WithDefault(config.DefaultBatchMaxSize))), + ) + + // Create output channel + out := make(chan iface.DagImportResult) + + // Acquire pinlock BEFORE spawning goroutine if pinning roots + // This prevents race condition with GC (lock serves as both pin and GC lock) + var pinUnlocker func(context.Context) + if settings.PinRoots { + pinUnlocker = api.core.blockstore.PinLock(ctx).Unlock + } + + // Process import in background + go func() { + defer close(out) + defer file.Close() + if pinUnlocker != nil { + defer pinUnlocker(ctx) + } + + // Track roots from CAR headers and stats + roots := cid.NewSet() + var blockCount, blockBytesCount uint64 + + // Parse CAR file + car, err := gocarv2.NewBlockReader(file) + if err != nil { + out <- iface.DagImportResult{Err: fmt.Errorf("failed to create CAR reader: %w", err)} + return + } + + // Collect roots from CAR header + for _, c := range car.Roots { + roots.Add(c) + } + + // Process all blocks from CAR file + var previous blocks.Block + for { + block, err := car.Next() + if err != nil { + if err != io.EOF { + if previous != nil { + out <- iface.DagImportResult{Err: fmt.Errorf("failed to read block after %s: %w", previous.Cid(), err)} + } else { + out <- iface.DagImportResult{Err: fmt.Errorf("failed to read CAR blocks: %w", err)} + } + } + break + } + if block == nil { + break + } + + // Decode block into IPLD node + nd, err := blockDecoder.DecodeNode(ctx, block) + if err != nil { + out <- iface.DagImportResult{Err: fmt.Errorf("failed to decode block %s: %w", block.Cid(), err)} + return + } + + // Add node to batch + if err := batch.Add(ctx, nd); err != nil { + out <- iface.DagImportResult{Err: fmt.Errorf("failed to add block %s to batch: %w", nd.Cid(), err)} + return + } + + blockCount++ + blockBytesCount += uint64(len(block.RawData())) + previous = block + + // Check context cancellation + select { + case <-ctx.Done(): + out <- iface.DagImportResult{Err: ctx.Err()} + return + default: + } + } + + // Commit batch to blockstore + if err := batch.Commit(); err != nil { + out <- iface.DagImportResult{Err: fmt.Errorf("failed to commit batch: %w", err)} + return + } + + // Emit all roots (with pin status if requested) + err = roots.ForEach(func(c cid.Cid) error { + result := iface.DagImportResult{ + Root: &iface.DagImportRoot{Cid: c}, + } + + // Attempt to pin if requested + if settings.PinRoots { + // Verify block exists in blockstore + block, err := api.core.blockstore.Get(ctx, c) + if err != nil { + result.Root.PinErrorMsg = fmt.Sprintf("blockstore get: %v", err) + } else { + // Decode node for pinning + nd, err := blockDecoder.DecodeNode(ctx, block) + if err != nil { + result.Root.PinErrorMsg = fmt.Sprintf("decode node: %v", err) + } else { + // Pin recursively + err = api.core.pinning.Pin(ctx, nd, true, "") + if err != nil { + result.Root.PinErrorMsg = fmt.Sprintf("pin: %v", err) + } else { + // Flush pins to storage + err = api.core.pinning.Flush(ctx) + if err != nil { + result.Root.PinErrorMsg = fmt.Sprintf("flush: %v", err) + } + } + } + } + } + + // Send root result + select { + case out <- result: + case <-ctx.Done(): + return ctx.Err() + } + + return nil + }) + if err != nil { + out <- iface.DagImportResult{Err: fmt.Errorf("failed to emit roots: %w", err)} + return + } + + // Emit stats if requested + if settings.Stats { + select { + case out <- iface.DagImportResult{ + Stats: &iface.DagImportStats{ + BlockCount: blockCount, + BlockBytesCount: blockBytesCount, + }, + }: + case <-ctx.Done(): + return + } + } + + // Execute fast-provide (will check if enabled) + if err := api.executeFastProvide(ctx, cfg, roots, settings.FastProvideRoot, settings.FastProvideWait, settings.PinRoots, settings.PinRoots, false); err != nil { + select { + case out <- iface.DagImportResult{Err: err}: + case <-ctx.Done(): + } + } + }() + + return out, nil +} + +// executeFastProvide announces roots to the DHT for faster discovery +func (api *dagAPI) executeFastProvide(ctx context.Context, cfg *config.Config, roots *cid.Set, enabled bool, wait bool, isPinned bool, isPinnedRoot bool, isMFS bool) error { + // Check if fast-provide is enabled + if !enabled { + if wait { + log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config", "wait-flag-ignored", true) + } else { + log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config") + } + return nil + } + + log.Debugw("fast-provide-root: enabled", "wait", wait) + + // Check preconditions for providing + if !cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled) { + log.Debugw("fast-provide-root: skipped", "reason", "Provide.Enabled is false") + return nil + } + + if cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) == 0 { + log.Debugw("fast-provide-root: skipped", "reason", "Provide.DHT.Interval is 0") + return nil + } + + if !api.core.nd.HasActiveDHTClient() { + log.Debugw("fast-provide-root: skipped", "reason", "DHT not available") + return nil + } + + // Check provide strategy + strategyStr := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy) + strategy := config.ParseProvideStrategy(strategyStr) + shouldProvide := config.ShouldProvideForStrategy(strategy, isPinned, isPinnedRoot, isMFS) + + if !shouldProvide { + log.Debugw("fast-provide-root: skipped", "reason", "strategy does not match content", "strategy", strategyStr, "pinned", isPinned, "pinnedRoot", isPinnedRoot, "mfs", isMFS) + return nil + } + + // Provide each root + return roots.ForEach(func(c cid.Cid) error { + if wait { + // Synchronous mode: block until provide completes + log.Debugw("fast-provide-root: providing synchronously", "cid", c) + if err := api.core.nd.DHTClient.Provide(ctx, c, true); err != nil { + log.Warnw("fast-provide-root: sync provide failed", "cid", c, "error", err) + return fmt.Errorf("fast-provide: %w", err) + } + log.Debugw("fast-provide-root: sync provide completed", "cid", c) + } else { + // Asynchronous mode: fire-and-forget in goroutine + log.Debugw("fast-provide-root: providing asynchronously", "cid", c) + go func(rootCid cid.Cid) { + // Use detached context with timeout to prevent hanging + asyncCtx, cancel := context.WithTimeout(context.Background(), config.DefaultFastProvideTimeout) + defer cancel() + if err := api.core.nd.DHTClient.Provide(asyncCtx, rootCid, true); err != nil { + log.Warnw("fast-provide-root: async provide failed", "cid", rootCid, "error", err) + } else { + log.Debugw("fast-provide-root: async provide completed", "cid", rootCid) + } + }(c) + } + return nil + }) +} + var ( _ ipld.DAGService = (*dagAPI)(nil) _ dag.SessionMaker = (*dagAPI)(nil) diff --git a/core/coreapi/routing.go b/core/coreapi/routing.go index b9c25805622..2910acdf280 100644 --- a/core/coreapi/routing.go +++ b/core/coreapi/routing.go @@ -11,12 +11,16 @@ import ( offline "github.com/ipfs/boxo/exchange/offline" dag "github.com/ipfs/boxo/ipld/merkledag" "github.com/ipfs/boxo/path" + boxoprovider "github.com/ipfs/boxo/provider" cid "github.com/ipfs/go-cid" cidutil "github.com/ipfs/go-cidutil" coreiface "github.com/ipfs/kubo/core/coreiface" caopts "github.com/ipfs/kubo/core/coreiface/options" "github.com/ipfs/kubo/core/node" "github.com/ipfs/kubo/tracing" + "github.com/libp2p/go-libp2p-kad-dht/provider" + "github.com/libp2p/go-libp2p-kad-dht/provider/buffered" + "github.com/libp2p/go-libp2p-kad-dht/provider/dual" peer "github.com/libp2p/go-libp2p/core/peer" mh "github.com/multiformats/go-multihash" "go.opentelemetry.io/otel/attribute" @@ -222,6 +226,63 @@ func provideKeysRec(ctx context.Context, prov node.DHTProvider, bs blockstore.Bl } } +// extractSweepingProvider extracts a SweepingProvider from the given provider interface. +// It handles unwrapping buffered and dual providers, selecting LAN or WAN as specified. +// Returns nil if the provider is not a sweeping provider type. +func extractSweepingProvider(prov any, useLAN bool) *provider.SweepingProvider { + switch p := prov.(type) { + case *provider.SweepingProvider: + return p + case *dual.SweepingProvider: + if useLAN { + return p.LAN + } + return p.WAN + case *buffered.SweepingProvider: + // Recursively extract from the inner provider + return extractSweepingProvider(p.Provider, useLAN) + default: + return nil + } +} + +func (api *RoutingAPI) ProvideStats(ctx context.Context, opts ...caopts.RoutingProvideStatOption) (*coreiface.ProvideStatsResponse, error) { + options, err := caopts.RoutingProvideStatOptions(opts...) + if err != nil { + return nil, err + } + + if !api.nd.IsOnline { + return nil, coreiface.ErrOffline + } + + // Handle legacy provider + if legacySys, ok := api.provider.(boxoprovider.System); ok { + if options.UseLAN { + return nil, errors.New("LAN stats only available for Sweep provider with Dual DHT") + } + stats, err := legacySys.Stat() + if err != nil { + return nil, err + } + // Note: FullRT field is not set here as we don't have access to nd.DHTClient + // This field is primarily for display purposes in the command + return &coreiface.ProvideStatsResponse{Legacy: &stats, FullRT: false}, nil + } + + // Extract sweeping provider (handles buffered and dual unwrapping) + sweepingProvider := extractSweepingProvider(api.provider, options.UseLAN) + if sweepingProvider == nil { + if options.UseLAN { + return nil, errors.New("LAN stats only available for Sweep provider with Dual DHT") + } + return nil, fmt.Errorf("stats not available with current routing system %T", api.provider) + } + + s := sweepingProvider.Stats() + return &coreiface.ProvideStatsResponse{Sweep: &s}, nil +} + func (api *RoutingAPI) core() coreiface.CoreAPI { return (*CoreAPI)(api) } diff --git a/core/coreiface/dag.go b/core/coreiface/dag.go index 3cc3aeb4de2..c0cf5acd9d4 100644 --- a/core/coreiface/dag.go +++ b/core/coreiface/dag.go @@ -1,13 +1,44 @@ package iface import ( + "context" + + "github.com/ipfs/boxo/files" + "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/kubo/core/coreiface/options" ) +// DagImportResult represents the result of importing roots or stats from CAR files. +// Each result has either Root or Stats set, never both. +type DagImportResult struct { + Root *DagImportRoot + Stats *DagImportStats + Err error +} + +// DagImportRoot represents a root CID from a CAR file header +type DagImportRoot struct { + Cid cid.Cid + PinErrorMsg string +} + +// DagImportStats contains statistics about the import operation +type DagImportStats struct { + BlockCount uint64 + BlockBytesCount uint64 +} + // APIDagService extends ipld.DAGService type APIDagService interface { ipld.DAGService // Pinning returns special NodeAdder which recursively pins added nodes Pinning() ipld.NodeAdder + + // Import imports data from CAR files. + // Returns a channel that streams results for each root CID found in CAR headers, + // and optionally stats at the end if requested via options. + // Supports importing multiple CAR files, each with multiple roots. + Import(context.Context, files.File, ...options.DagImportOption) (<-chan DagImportResult, error) } diff --git a/core/coreiface/options/dag.go b/core/coreiface/options/dag.go new file mode 100644 index 00000000000..90d4e1251a2 --- /dev/null +++ b/core/coreiface/options/dag.go @@ -0,0 +1,80 @@ +package options + +type DagImportSettings struct { + PinRoots bool + PinRootsSet bool + Stats bool + StatsSet bool + FastProvideRoot bool + FastProvideRootSet bool + FastProvideWait bool + FastProvideWaitSet bool +} + +type DagImportOption func(*DagImportSettings) error + +func DagImportOptions(opts ...DagImportOption) (*DagImportSettings, error) { + options := &DagImportSettings{ + PinRoots: false, + PinRootsSet: false, + Stats: false, + StatsSet: false, + FastProvideRoot: false, + FastProvideRootSet: false, + FastProvideWait: false, + FastProvideWaitSet: false, + } + + for _, opt := range opts { + err := opt(options) + if err != nil { + return nil, err + } + } + + return options, nil +} + +type dagOpts struct{} + +var Dag dagOpts + +// PinRoots sets whether to pin roots listed in CAR headers after importing. +// If not set, server uses command default (true). +func (dagOpts) PinRoots(pin bool) DagImportOption { + return func(settings *DagImportSettings) error { + settings.PinRoots = pin + settings.PinRootsSet = true + return nil + } +} + +// Stats enables output of import statistics (block count and bytes). +// If not set, server uses command default (false). +func (dagOpts) Stats(enable bool) DagImportOption { + return func(settings *DagImportSettings) error { + settings.Stats = enable + settings.StatsSet = true + return nil + } +} + +// FastProvideRoot sets whether to immediately provide root CIDs to DHT for faster discovery. +// If not set, server uses Import.FastProvideRoot config value (default: true). +func (dagOpts) FastProvideRoot(enable bool) DagImportOption { + return func(settings *DagImportSettings) error { + settings.FastProvideRoot = enable + settings.FastProvideRootSet = true + return nil + } +} + +// FastProvideWait sets whether to block until fast provide completes. +// If not set, server uses Import.FastProvideWait config value (default: false). +func (dagOpts) FastProvideWait(enable bool) DagImportOption { + return func(settings *DagImportSettings) error { + settings.FastProvideWait = enable + settings.FastProvideWaitSet = true + return nil + } +} diff --git a/core/coreiface/options/routing.go b/core/coreiface/options/routing.go index 8da7e7a1db2..55b054ce03c 100644 --- a/core/coreiface/options/routing.go +++ b/core/coreiface/options/routing.go @@ -96,3 +96,34 @@ func (routingOpts) AllowOffline(allow bool) RoutingPutOption { return nil } } + +type RoutingProvideStatSettings struct { + UseLAN bool +} + +type RoutingProvideStatOption func(*RoutingProvideStatSettings) error + +func RoutingProvideStatOptions(opts ...RoutingProvideStatOption) (*RoutingProvideStatSettings, error) { + options := &RoutingProvideStatSettings{ + UseLAN: false, + } + + for _, opt := range opts { + err := opt(options) + if err != nil { + return nil, err + } + } + + return options, nil +} + +// UseLAN is an option for [Routing.ProvideStats] which specifies whether to +// return stats for LAN DHT only (only valid for Sweep provider with Dual DHT). +// Default value is false (WAN DHT stats). +func (routingOpts) UseLAN(useLAN bool) RoutingProvideStatOption { + return func(settings *RoutingProvideStatSettings) error { + settings.UseLAN = useLAN + return nil + } +} diff --git a/core/coreiface/options/unixfs.go b/core/coreiface/options/unixfs.go index 45e880ed1cb..b95e546216f 100644 --- a/core/coreiface/options/unixfs.go +++ b/core/coreiface/options/unixfs.go @@ -52,6 +52,11 @@ type UnixfsAddSettings struct { PreserveMtime bool Mode os.FileMode Mtime time.Time + + FastProvideRoot bool + FastProvideRootSet bool + FastProvideWait bool + FastProvideWaitSet bool } type UnixfsLsSettings struct { @@ -97,6 +102,11 @@ func UnixfsAddOptions(opts ...UnixfsAddOption) (*UnixfsAddSettings, cid.Prefix, PreserveMtime: false, Mode: 0, Mtime: time.Time{}, + + FastProvideRoot: false, + FastProvideRootSet: false, + FastProvideWait: false, + FastProvideWaitSet: false, } for _, opt := range opts { @@ -396,3 +406,23 @@ func (unixfsOpts) Mtime(seconds int64, nsecs uint32) UnixfsAddOption { return nil } } + +// FastProvideRoot sets whether to immediately provide root CID to DHT for faster discovery. +// If not set, server uses Import.FastProvideRoot config value (default: true). +func (unixfsOpts) FastProvideRoot(enable bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.FastProvideRoot = enable + settings.FastProvideRootSet = true + return nil + } +} + +// FastProvideWait sets whether to block until fast provide completes. +// If not set, server uses Import.FastProvideWait config value (default: false). +func (unixfsOpts) FastProvideWait(enable bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.FastProvideWait = enable + settings.FastProvideWaitSet = true + return nil + } +} diff --git a/core/coreiface/routing.go b/core/coreiface/routing.go index a17dfcad920..4476884c914 100644 --- a/core/coreiface/routing.go +++ b/core/coreiface/routing.go @@ -4,7 +4,9 @@ import ( "context" "github.com/ipfs/boxo/path" + boxoprovider "github.com/ipfs/boxo/provider" "github.com/ipfs/kubo/core/coreiface/options" + "github.com/libp2p/go-libp2p-kad-dht/provider/stats" "github.com/libp2p/go-libp2p/core/peer" ) @@ -26,4 +28,15 @@ type RoutingAPI interface { // Provide announces to the network that you are providing given values Provide(context.Context, path.Path, ...options.RoutingProvideOption) error + + // ProvideStats returns statistics about the provide system. + // Returns stats for either sweep provider (new default) or legacy provider. + ProvideStats(context.Context, ...options.RoutingProvideStatOption) (*ProvideStatsResponse, error) +} + +// ProvideStatsResponse contains statistics about the provide system +type ProvideStatsResponse struct { + Sweep *stats.Stats `json:"Sweep,omitempty"` + Legacy *boxoprovider.ReproviderStats `json:"Legacy,omitempty"` + FullRT bool `json:"FullRT,omitempty"` } diff --git a/test/cli/dag_test.go b/test/cli/dag_test.go index f6758a71037..858b61fd76f 100644 --- a/test/cli/dag_test.go +++ b/test/cli/dag_test.go @@ -120,7 +120,7 @@ func TestDagImportFastProvide(t *testing.T) { node.StartDaemonWithReq(harness.RunRequest{ CmdOpts: []harness.CmdOpt{ harness.RunWithEnv(map[string]string{ - "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + "GOLOG_LOG_LEVEL": "error,coreapi=debug", }), }, }, "") @@ -146,7 +146,7 @@ func TestDagImportFastProvide(t *testing.T) { node.StartDaemonWithReq(harness.RunRequest{ CmdOpts: []harness.CmdOpt{ harness.RunWithEnv(map[string]string{ - "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + "GOLOG_LOG_LEVEL": "error,coreapi=debug", }), }, }, "") @@ -183,7 +183,7 @@ func TestDagImportFastProvide(t *testing.T) { node.StartDaemonWithReq(harness.RunRequest{ CmdOpts: []harness.CmdOpt{ harness.RunWithEnv(map[string]string{ - "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + "GOLOG_LOG_LEVEL": "error,coreapi=debug", }), }, }, "") @@ -226,7 +226,7 @@ func TestDagImportFastProvide(t *testing.T) { node.StartDaemonWithReq(harness.RunRequest{ CmdOpts: []harness.CmdOpt{ harness.RunWithEnv(map[string]string{ - "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + "GOLOG_LOG_LEVEL": "error,coreapi=debug", }), }, }, "") @@ -254,7 +254,7 @@ func TestDagImportFastProvide(t *testing.T) { node.StartDaemonWithReq(harness.RunRequest{ CmdOpts: []harness.CmdOpt{ harness.RunWithEnv(map[string]string{ - "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + "GOLOG_LOG_LEVEL": "error,coreapi=debug", }), }, }, "") @@ -284,7 +284,7 @@ func TestDagImportFastProvide(t *testing.T) { node.StartDaemonWithReq(harness.RunRequest{ CmdOpts: []harness.CmdOpt{ harness.RunWithEnv(map[string]string{ - "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + "GOLOG_LOG_LEVEL": "error,coreapi=debug", }), }, }, "") diff --git a/test/cli/provide_stats_test.go b/test/cli/provide_stats_test.go index fede31c0fc3..13689a77a79 100644 --- a/test/cli/provide_stats_test.go +++ b/test/cli/provide_stats_test.go @@ -180,7 +180,7 @@ func TestProvideStatBasic(t *testing.T) { res := node.RunIPFS("provide", "stat") assert.Error(t, res.Err) - assert.Contains(t, res.Stderr.String(), "this command must be run in online mode") + assert.Contains(t, res.Stderr.String(), "this action must be run in online mode") }) } diff --git a/test/dependencies/go.mod b/test/dependencies/go.mod index 68c5a99c642..d5d8fda5045 100644 --- a/test/dependencies/go.mod +++ b/test/dependencies/go.mod @@ -249,6 +249,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/polydawn/refmt v0.89.0 // indirect github.com/polyfloyd/go-errorlint v1.7.1 // indirect + github.com/probe-lab/go-libdht v0.4.0 // indirect github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect diff --git a/test/dependencies/go.sum b/test/dependencies/go.sum index 005747af87a..01fb2868be5 100644 --- a/test/dependencies/go.sum +++ b/test/dependencies/go.sum @@ -650,6 +650,8 @@ github.com/polyfloyd/go-errorlint v1.7.1 h1:RyLVXIbosq1gBdk/pChWA8zWYLsq9UEw7a1L github.com/polyfloyd/go-errorlint v1.7.1/go.mod h1:aXjNb1x2TNhoLsk26iv1yl7a+zTnXPhwEMtEXukiLR8= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= +github.com/probe-lab/go-libdht v0.4.0 h1:LAqHuko/owRW6+0cs5wmJXbHzg09EUMJEh5DI37yXqo= +github.com/probe-lab/go-libdht v0.4.0/go.mod h1:hamw22kI6YkPQFGy5P6BrWWDrgE9ety5Si8iWAyuDvc= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=