Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions client/rpc/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package rpc
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"

"github.com/ipfs/boxo/files"
"github.com/ipfs/boxo/path"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/ipfs/kubo/core/coreiface/options"
multicodec "github.com/multiformats/go-multicodec"
)
Expand Down Expand Up @@ -129,6 +132,72 @@ func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error {
return nil
}

func (api *HttpDagServ) Import(ctx context.Context, file files.File, opts ...options.DagImportOption) (<-chan iface.DagImportResult, error) {
options, err := options.DagImportOptions(opts...)
if err != nil {
return nil, err
}

req := api.core().Request("dag/import")

if options.PinRootsSet {
req.Option("pin-roots", options.PinRoots)
}

if options.StatsSet {
req.Option("stats", options.Stats)
}

if options.FastProvideRootSet {
req.Option("fast-provide-root", options.FastProvideRoot)
}

if options.FastProvideWaitSet {
req.Option("fast-provide-wait", options.FastProvideWait)
}

req.Body(files.NewMultiFileReader(files.NewMapDirectory(map[string]files.Node{"": file}), false, false))

resp, err := req.Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}

out := make(chan iface.DagImportResult)

go func() {
defer resp.Close()
defer close(out)

dec := json.NewDecoder(resp.Output)

for {
var event iface.DagImportResult

if err := dec.Decode(&event); err != nil {
if err != io.EOF {
select {
case out <- iface.DagImportResult{Err: err}:
case <-ctx.Done():
}
}
return
}

select {
case out <- event:
case <-ctx.Done():
return
}
}
}()

return out, nil
}

func (api *httpNodeAdder) core() *HttpApi {
return (*HttpApi)(api)
}
Expand Down
176 changes: 176 additions & 0 deletions client/rpc/dag_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package rpc

import (
"context"
"os"
"testing"

"github.com/ipfs/boxo/files"
"github.com/ipfs/go-cid"
"github.com/ipfs/kubo/core/coreiface/options"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/require"
)

func TestDagImport_Basic(t *testing.T) {
t.Parallel()

ctx := context.Background()
h := harness.NewT(t)
node := h.NewNode().Init().StartDaemon()
defer node.StopDaemon()

apiMaddr, err := node.TryAPIAddr()
require.NoError(t, err)

api, err := NewApi(apiMaddr)
require.NoError(t, err)

// Open test fixture
carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car")
require.NoError(t, err)
defer carFile.Close()

// Import CAR file
results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile))
require.NoError(t, err)

// Collect results
var roots []cid.Cid
for result := range results {
if result.Root != nil {
roots = append(roots, result.Root.Cid)
require.Empty(t, result.Root.PinErrorMsg, "pin should succeed")
}
}

// Verify we got exactly one root
require.Len(t, roots, 1, "should have exactly one root")

// Verify the expected root CID
expectedRoot := "bafyreifrm6uf5o4dsaacuszf35zhibyojlqclabzrms7iak67pf62jygaq"
require.Equal(t, expectedRoot, roots[0].String())
}

func TestDagImport_WithStats(t *testing.T) {
t.Parallel()

ctx := context.Background()
h := harness.NewT(t)
node := h.NewNode().Init().StartDaemon()
defer node.StopDaemon()

apiMaddr, err := node.TryAPIAddr()
require.NoError(t, err)

api, err := NewApi(apiMaddr)
require.NoError(t, err)

carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car")
require.NoError(t, err)
defer carFile.Close()

// Import with stats enabled
results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile),
options.Dag.Stats(true))
require.NoError(t, err)

var roots []cid.Cid
var gotStats bool
var blockCount uint64

for result := range results {
if result.Root != nil {
roots = append(roots, result.Root.Cid)
}
if result.Stats != nil {
gotStats = true
blockCount = result.Stats.BlockCount
}
}

require.Len(t, roots, 1, "should have one root")
require.True(t, gotStats, "should receive stats")
require.Equal(t, uint64(4), blockCount, "TestDagStat.car has 4 blocks")
}

func TestDagImport_OfflineWithFastProvide(t *testing.T) {
t.Parallel()

ctx := context.Background()
h := harness.NewT(t)
node := h.NewNode().Init().StartDaemon("--offline=true")
defer node.StopDaemon()

apiMaddr, err := node.TryAPIAddr()
require.NoError(t, err)

api, err := NewApi(apiMaddr)
require.NoError(t, err)

carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car")
require.NoError(t, err)
defer carFile.Close()

// Import with fast-provide enabled in offline mode
// Should succeed gracefully (fast-provide silently skipped)
results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile),
options.Dag.FastProvideRoot(true),
options.Dag.FastProvideWait(true))
require.NoError(t, err)

var roots []cid.Cid
for result := range results {
if result.Root != nil {
roots = append(roots, result.Root.Cid)
}
}

require.Len(t, roots, 1, "import should succeed offline with fast-provide enabled")
}

func TestDagImport_OnlineWithFastProvideWait(t *testing.T) {
t.Parallel()

ctx := context.Background()
h := harness.NewT(t)
node := h.NewNode().Init().StartDaemon()
defer node.StopDaemon()

apiMaddr, err := node.TryAPIAddr()
require.NoError(t, err)

api, err := NewApi(apiMaddr)
require.NoError(t, err)

carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car")
require.NoError(t, err)
defer carFile.Close()

// Import with fast-provide wait enabled in online mode.
// This tests that FastProvideWait actually blocks (not fire-and-forget).
// In isolated test environment with no DHT peers, the blocking provide
// operation should fail and propagate an error.
results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile),
options.Dag.FastProvideRoot(true),
options.Dag.FastProvideWait(true))

// Initial call may succeed, but we should get error from results channel
if err == nil {
// Consume results until we hit the expected error
var gotError bool
for result := range results {
if result.Err != nil {
gotError = true
require.Contains(t, result.Err.Error(), "fast-provide",
"error should be from fast-provide operation")
break
}
}
require.True(t, gotError, "should receive fast-provide error in isolated test environment")
} else {
// Error returned directly (also acceptable)
require.Contains(t, err.Error(), "fast-provide",
"error should be from fast-provide operation")
}
}
26 changes: 26 additions & 0 deletions client/rpc/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"

"github.com/ipfs/boxo/path"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/ipfs/kubo/core/coreiface/options"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
Expand Down Expand Up @@ -156,6 +157,31 @@ func (api *RoutingAPI) Provide(ctx context.Context, p path.Path, opts ...options
Exec(ctx, nil)
}

func (api *RoutingAPI) ProvideStats(ctx context.Context, opts ...options.RoutingProvideStatOption) (*iface.ProvideStatsResponse, error) {
options, err := options.RoutingProvideStatOptions(opts...)
if err != nil {
return nil, err
}

resp, err := api.core().Request("provide/stat").
Option("lan", options.UseLAN).
Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
defer resp.Close()

var out iface.ProvideStatsResponse
if err := json.NewDecoder(resp.Output).Decode(&out); err != nil {
return nil, err
}

return &out, nil
}

func (api *RoutingAPI) core() *HttpApi {
return (*HttpApi)(api)
}
Loading
Loading