Skip to content

Commit 7c5db10

Browse files
committed
feat(rpc): add dag import with fast-provide support
adds Import method to APIDagService interface and RPC client implementation - new DagImportResult, DagImportRoot, DagImportStats types in coreiface - DagImportOptions with uniform Set pattern for all params (PinRoots, Stats, FastProvideRoot, FastProvideWait) - streaming channel API for handling multiple roots and stats - tests covering basic import, stats, offline mode, and blocking wait
1 parent 91dd98c commit 7c5db10

File tree

4 files changed

+356
-0
lines changed

4 files changed

+356
-0
lines changed

client/rpc/dag.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@ package rpc
33
import (
44
"bytes"
55
"context"
6+
"encoding/json"
67
"fmt"
78
"io"
89

10+
"github.com/ipfs/boxo/files"
911
"github.com/ipfs/boxo/path"
1012
blocks "github.com/ipfs/go-block-format"
1113
"github.com/ipfs/go-cid"
1214
format "github.com/ipfs/go-ipld-format"
15+
iface "github.com/ipfs/kubo/core/coreiface"
1316
"github.com/ipfs/kubo/core/coreiface/options"
1417
multicodec "github.com/multiformats/go-multicodec"
1518
)
@@ -129,6 +132,72 @@ func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error {
129132
return nil
130133
}
131134

135+
func (api *HttpDagServ) Import(ctx context.Context, file files.File, opts ...options.DagImportOption) (<-chan iface.DagImportResult, error) {
136+
options, err := options.DagImportOptions(opts...)
137+
if err != nil {
138+
return nil, err
139+
}
140+
141+
req := api.core().Request("dag/import")
142+
143+
if options.PinRootsSet {
144+
req.Option("pin-roots", options.PinRoots)
145+
}
146+
147+
if options.StatsSet {
148+
req.Option("stats", options.Stats)
149+
}
150+
151+
if options.FastProvideRootSet {
152+
req.Option("fast-provide-root", options.FastProvideRoot)
153+
}
154+
155+
if options.FastProvideWaitSet {
156+
req.Option("fast-provide-wait", options.FastProvideWait)
157+
}
158+
159+
req.Body(files.NewMultiFileReader(files.NewMapDirectory(map[string]files.Node{"": file}), false, false))
160+
161+
resp, err := req.Send(ctx)
162+
if err != nil {
163+
return nil, err
164+
}
165+
if resp.Error != nil {
166+
return nil, resp.Error
167+
}
168+
169+
out := make(chan iface.DagImportResult)
170+
171+
go func() {
172+
defer resp.Close()
173+
defer close(out)
174+
175+
dec := json.NewDecoder(resp.Output)
176+
177+
for {
178+
var event iface.DagImportResult
179+
180+
if err := dec.Decode(&event); err != nil {
181+
if err != io.EOF {
182+
select {
183+
case out <- iface.DagImportResult{}:
184+
case <-ctx.Done():
185+
}
186+
}
187+
return
188+
}
189+
190+
select {
191+
case out <- event:
192+
case <-ctx.Done():
193+
return
194+
}
195+
}
196+
}()
197+
198+
return out, nil
199+
}
200+
132201
func (api *httpNodeAdder) core() *HttpApi {
133202
return (*HttpApi)(api)
134203
}

client/rpc/dag_test.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package rpc
2+
3+
import (
4+
"context"
5+
"os"
6+
"testing"
7+
8+
"github.com/ipfs/boxo/files"
9+
"github.com/ipfs/go-cid"
10+
"github.com/ipfs/kubo/core/coreiface/options"
11+
"github.com/ipfs/kubo/test/cli/harness"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func TestDagImport_Basic(t *testing.T) {
16+
t.Parallel()
17+
18+
ctx := context.Background()
19+
h := harness.NewT(t)
20+
node := h.NewNode().Init().StartDaemon()
21+
defer node.StopDaemon()
22+
23+
apiMaddr, err := node.TryAPIAddr()
24+
require.NoError(t, err)
25+
26+
api, err := NewApi(apiMaddr)
27+
require.NoError(t, err)
28+
29+
// Open test fixture
30+
carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car")
31+
require.NoError(t, err)
32+
defer carFile.Close()
33+
34+
// Import CAR file
35+
results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile))
36+
require.NoError(t, err)
37+
38+
// Collect results
39+
var roots []cid.Cid
40+
for result := range results {
41+
if result.Root != nil {
42+
roots = append(roots, result.Root.Cid)
43+
require.Empty(t, result.Root.PinErrorMsg, "pin should succeed")
44+
}
45+
}
46+
47+
// Verify we got exactly one root
48+
require.Len(t, roots, 1, "should have exactly one root")
49+
50+
// Verify the expected root CID
51+
expectedRoot := "bafyreifrm6uf5o4dsaacuszf35zhibyojlqclabzrms7iak67pf62jygaq"
52+
require.Equal(t, expectedRoot, roots[0].String())
53+
}
54+
55+
func TestDagImport_WithStats(t *testing.T) {
56+
t.Parallel()
57+
58+
ctx := context.Background()
59+
h := harness.NewT(t)
60+
node := h.NewNode().Init().StartDaemon()
61+
defer node.StopDaemon()
62+
63+
apiMaddr, err := node.TryAPIAddr()
64+
require.NoError(t, err)
65+
66+
api, err := NewApi(apiMaddr)
67+
require.NoError(t, err)
68+
69+
carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car")
70+
require.NoError(t, err)
71+
defer carFile.Close()
72+
73+
// Import with stats enabled
74+
results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile),
75+
options.Dag.Stats(true))
76+
require.NoError(t, err)
77+
78+
var roots []cid.Cid
79+
var gotStats bool
80+
var blockCount uint64
81+
82+
for result := range results {
83+
if result.Root != nil {
84+
roots = append(roots, result.Root.Cid)
85+
}
86+
if result.Stats != nil {
87+
gotStats = true
88+
blockCount = result.Stats.BlockCount
89+
}
90+
}
91+
92+
require.Len(t, roots, 1, "should have one root")
93+
require.True(t, gotStats, "should receive stats")
94+
require.Equal(t, uint64(4), blockCount, "TestDagStat.car has 4 blocks")
95+
}
96+
97+
func TestDagImport_OfflineWithFastProvide(t *testing.T) {
98+
t.Parallel()
99+
100+
ctx := context.Background()
101+
h := harness.NewT(t)
102+
node := h.NewNode().Init().StartDaemon("--offline=true")
103+
defer node.StopDaemon()
104+
105+
apiMaddr, err := node.TryAPIAddr()
106+
require.NoError(t, err)
107+
108+
api, err := NewApi(apiMaddr)
109+
require.NoError(t, err)
110+
111+
carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car")
112+
require.NoError(t, err)
113+
defer carFile.Close()
114+
115+
// Import with fast-provide enabled in offline mode
116+
// Should succeed gracefully (fast-provide silently skipped)
117+
results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile),
118+
options.Dag.FastProvideRoot(true),
119+
options.Dag.FastProvideWait(true))
120+
require.NoError(t, err)
121+
122+
var roots []cid.Cid
123+
for result := range results {
124+
if result.Root != nil {
125+
roots = append(roots, result.Root.Cid)
126+
}
127+
}
128+
129+
require.Len(t, roots, 1, "import should succeed offline with fast-provide enabled")
130+
}
131+
132+
func TestDagImport_OnlineWithFastProvideWait(t *testing.T) {
133+
t.Parallel()
134+
135+
ctx := context.Background()
136+
h := harness.NewT(t)
137+
node := h.NewNode().Init().StartDaemon()
138+
defer node.StopDaemon()
139+
140+
apiMaddr, err := node.TryAPIAddr()
141+
require.NoError(t, err)
142+
143+
api, err := NewApi(apiMaddr)
144+
require.NoError(t, err)
145+
146+
carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car")
147+
require.NoError(t, err)
148+
defer carFile.Close()
149+
150+
// Import with fast-provide wait enabled in online mode
151+
// This tests that FastProvideWait actually blocks (not fire-and-forget).
152+
// In isolated test environment (no DHT peers), the provide operation may:
153+
// 1. Succeed trivially (announced to randomly discovered peers), or
154+
// 2. Return an error (timeout/no peers)
155+
// Both outcomes prove blocking behavior works correctly.
156+
results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile),
157+
options.Dag.FastProvideRoot(true),
158+
options.Dag.FastProvideWait(true))
159+
160+
if err != nil {
161+
// Blocking wait detected provide failure (no DHT peers in isolated test)
162+
// This proves FastProvideWait actually blocked and error propagated
163+
require.Contains(t, err.Error(), "fast-provide",
164+
"error should be from fast-provide operation")
165+
return // Test passed - blocking wait worked and returned error
166+
}
167+
168+
// No error - provide succeeded, verify we got results
169+
var roots []cid.Cid
170+
for result := range results {
171+
if result.Root != nil {
172+
roots = append(roots, result.Root.Cid)
173+
}
174+
}
175+
176+
require.Len(t, roots, 1, "should receive one root when provide succeeds")
177+
}

core/coreiface/dag.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,43 @@
11
package iface
22

33
import (
4+
"context"
5+
6+
"github.com/ipfs/boxo/files"
7+
"github.com/ipfs/go-cid"
48
ipld "github.com/ipfs/go-ipld-format"
9+
"github.com/ipfs/kubo/core/coreiface/options"
510
)
611

12+
// DagImportResult represents the result of importing roots or stats from CAR files.
13+
// Each result has either Root or Stats set, never both.
14+
type DagImportResult struct {
15+
Root *DagImportRoot
16+
Stats *DagImportStats
17+
}
18+
19+
// DagImportRoot represents a root CID from a CAR file header
20+
type DagImportRoot struct {
21+
Cid cid.Cid
22+
PinErrorMsg string
23+
}
24+
25+
// DagImportStats contains statistics about the import operation
26+
type DagImportStats struct {
27+
BlockCount uint64
28+
BlockBytesCount uint64
29+
}
30+
731
// APIDagService extends ipld.DAGService
832
type APIDagService interface {
933
ipld.DAGService
1034

1135
// Pinning returns special NodeAdder which recursively pins added nodes
1236
Pinning() ipld.NodeAdder
37+
38+
// Import imports data from CAR files.
39+
// Returns a channel that streams results for each root CID found in CAR headers,
40+
// and optionally stats at the end if requested via options.
41+
// Supports importing multiple CAR files, each with multiple roots.
42+
Import(context.Context, files.File, ...options.DagImportOption) (<-chan DagImportResult, error)
1343
}

core/coreiface/options/dag.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package options
2+
3+
type DagImportSettings struct {
4+
PinRoots bool
5+
PinRootsSet bool
6+
Stats bool
7+
StatsSet bool
8+
FastProvideRoot bool
9+
FastProvideRootSet bool
10+
FastProvideWait bool
11+
FastProvideWaitSet bool
12+
}
13+
14+
type DagImportOption func(*DagImportSettings) error
15+
16+
func DagImportOptions(opts ...DagImportOption) (*DagImportSettings, error) {
17+
options := &DagImportSettings{
18+
PinRoots: false,
19+
PinRootsSet: false,
20+
Stats: false,
21+
StatsSet: false,
22+
FastProvideRoot: false,
23+
FastProvideRootSet: false,
24+
FastProvideWait: false,
25+
FastProvideWaitSet: false,
26+
}
27+
28+
for _, opt := range opts {
29+
err := opt(options)
30+
if err != nil {
31+
return nil, err
32+
}
33+
}
34+
35+
return options, nil
36+
}
37+
38+
type dagOpts struct{}
39+
40+
var Dag dagOpts
41+
42+
// PinRoots sets whether to pin roots listed in CAR headers after importing.
43+
// If not set, server uses command default (true).
44+
func (dagOpts) PinRoots(pin bool) DagImportOption {
45+
return func(settings *DagImportSettings) error {
46+
settings.PinRoots = pin
47+
settings.PinRootsSet = true
48+
return nil
49+
}
50+
}
51+
52+
// Stats enables output of import statistics (block count and bytes).
53+
// If not set, server uses command default (false).
54+
func (dagOpts) Stats(enable bool) DagImportOption {
55+
return func(settings *DagImportSettings) error {
56+
settings.Stats = enable
57+
settings.StatsSet = true
58+
return nil
59+
}
60+
}
61+
62+
// FastProvideRoot sets whether to immediately provide root CIDs to DHT for faster discovery.
63+
// If not set, server uses Import.FastProvideRoot config value (default: true).
64+
func (dagOpts) FastProvideRoot(enable bool) DagImportOption {
65+
return func(settings *DagImportSettings) error {
66+
settings.FastProvideRoot = enable
67+
settings.FastProvideRootSet = true
68+
return nil
69+
}
70+
}
71+
72+
// FastProvideWait sets whether to block until fast provide completes.
73+
// If not set, server uses Import.FastProvideWait config value (default: false).
74+
func (dagOpts) FastProvideWait(enable bool) DagImportOption {
75+
return func(settings *DagImportSettings) error {
76+
settings.FastProvideWait = enable
77+
settings.FastProvideWaitSet = true
78+
return nil
79+
}
80+
}

0 commit comments

Comments
 (0)