Skip to content

Commit 70f9809

Browse files
committed
feat: add ?meta=eof for trailling metadata dag-json
Ref: ipfs/specs#431
1 parent 7ca8f48 commit 70f9809

File tree

8 files changed

+430
-47
lines changed

8 files changed

+430
-47
lines changed

carstream.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,23 @@ var protoChooser = dagpb.AddSupportToChooser(basicnode.Chooser)
2929

3030
// StreamCar streams a DAG in CARv1 format to the given writer, using the given
3131
// selector.
32-
func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.Cid, selNode datamodel.Node, out io.Writer, duplicates bool) error {
32+
func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.Cid, selNode datamodel.Node, out io.Writer, duplicates bool) (int64, int64, error) {
3333
sel, err := selector.CompileSelector(selNode)
3434
if err != nil {
35-
return fmt.Errorf("failed to compile selector: %w", err)
35+
return 0, 0, fmt.Errorf("failed to compile selector: %w", err)
3636
}
3737

3838
carWriter, err := carstorage.NewWritable(out, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(duplicates))
3939
if err != nil {
40-
return fmt.Errorf("failed to create car writer: %w", err)
40+
return 0, 0, fmt.Errorf("failed to create car writer: %w", err)
4141
}
4242

43-
erro := &errorRecordingReadOpener{ctx, requestLsys.StorageReadOpener, carWriter, nil}
43+
erro := newErrorRecordingReadOpener(ctx, requestLsys.StorageReadOpener, carWriter)
4444
requestLsys.StorageReadOpener = erro.StorageReadOpener
4545

4646
rootNode, err := loadNode(ctx, rootCid, requestLsys)
4747
if err != nil {
48-
return fmt.Errorf("failed to load root node: %w", err)
48+
return 0, 0, fmt.Errorf("failed to load root node: %w", err)
4949
}
5050

5151
progress := traversal.Progress{Cfg: &traversal.Config{
@@ -54,20 +54,26 @@ func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.
5454
LinkTargetNodePrototypeChooser: protoChooser,
5555
}}
5656
if err := progress.WalkAdv(rootNode, sel, visitNoop); err != nil {
57-
return fmt.Errorf("failed to complete traversal: %w", err)
57+
return 0, 0, fmt.Errorf("failed to complete traversal: %w", err)
5858
}
5959
if erro.err != nil {
60-
return fmt.Errorf("block load failed during traversal: %w", erro.err)
60+
return 0, 0, fmt.Errorf("block load failed during traversal: %w", erro.err)
6161
}
6262

63-
return nil
63+
return erro.byteCount, erro.blockCount, nil
6464
}
6565

6666
type errorRecordingReadOpener struct {
67-
ctx context.Context
68-
orig linking.BlockReadOpener
69-
car carstorage.WritableCar
70-
err error
67+
ctx context.Context
68+
orig linking.BlockReadOpener
69+
car carstorage.WritableCar
70+
err error
71+
byteCount int64
72+
blockCount int64
73+
}
74+
75+
func newErrorRecordingReadOpener(ctx context.Context, orig linking.BlockReadOpener, car carstorage.WritableCar) *errorRecordingReadOpener {
76+
return &errorRecordingReadOpener{ctx, orig, car, nil, 0, 0}
7177
}
7278

7379
func (erro *errorRecordingReadOpener) StorageReadOpener(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
@@ -84,6 +90,8 @@ func (erro *errorRecordingReadOpener) StorageReadOpener(lc linking.LinkContext,
8490
if err != nil {
8591
return nil, err
8692
}
93+
erro.byteCount += int64(len(byts))
94+
erro.blockCount++
8795
return bytes.NewReader(byts), nil
8896
}
8997

carstream_test.go

Lines changed: 70 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -50,61 +50,73 @@ func TestStreamCar(t *testing.T) {
5050
}
5151

5252
testCases := []struct {
53-
name string
54-
selector datamodel.Node
55-
root cid.Cid
56-
lsys linking.LinkSystem
57-
validate func(t *testing.T, r io.Reader)
53+
name string
54+
selector datamodel.Node
55+
root cid.Cid
56+
lsys linking.LinkSystem
57+
expectedBytes int64
58+
expectedBlocks int64
59+
validate func(t *testing.T, r io.Reader)
5860
}{
5961
{
60-
name: "chain: all blocks",
61-
selector: selectorparse.CommonSelector_ExploreAllRecursively,
62-
root: tbc.TipLink.(cidlink.Link).Cid,
63-
lsys: chainLsys,
62+
name: "chain: all blocks",
63+
selector: selectorparse.CommonSelector_ExploreAllRecursively,
64+
root: tbc.TipLink.(cidlink.Link).Cid,
65+
lsys: chainLsys,
66+
expectedBytes: sizeOf(allChainBlocks),
67+
expectedBlocks: 100,
6468
validate: func(t *testing.T, r io.Reader) {
6569
root, blks := carToBlocks(t, r)
6670
require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root)
6771
require.Equal(t, allChainBlocks, blks)
6872
},
6973
},
7074
{
71-
name: "chain: just root",
72-
selector: selectorparse.CommonSelector_MatchPoint,
73-
root: tbc.TipLink.(cidlink.Link).Cid,
74-
lsys: chainLsys,
75+
name: "chain: just root",
76+
selector: selectorparse.CommonSelector_MatchPoint,
77+
root: tbc.TipLink.(cidlink.Link).Cid,
78+
lsys: chainLsys,
79+
expectedBytes: sizeOf(allChainBlocks[:1]),
80+
expectedBlocks: 1,
7581
validate: func(t *testing.T, r io.Reader) {
7682
root, blks := carToBlocks(t, r)
7783
require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root)
7884
require.Equal(t, []blocks.Block{allChainBlocks[0]}, blks)
7985
},
8086
},
8187
{
82-
name: "unixfs file",
83-
selector: selectorparse.CommonSelector_ExploreAllRecursively,
84-
root: fileEnt.Root,
85-
lsys: fileLsys,
88+
name: "unixfs file",
89+
selector: selectorparse.CommonSelector_ExploreAllRecursively,
90+
root: fileEnt.Root,
91+
lsys: fileLsys,
92+
expectedBytes: sizeOfDirEnt(fileEnt, fileLsys),
93+
expectedBlocks: int64(len(fileEnt.SelfCids)),
8694
validate: func(t *testing.T, r io.Reader) {
8795
root, blks := carToBlocks(t, r)
8896
require.Equal(t, fileEnt.Root, root)
8997
require.ElementsMatch(t, fileEnt.SelfCids, blkCids(blks))
9098
},
9199
},
92100
{
93-
name: "unixfs directory",
94-
selector: selectorparse.CommonSelector_ExploreAllRecursively,
95-
root: dirEnt.Root,
96-
lsys: dirLsys,
101+
name: "unixfs directory",
102+
selector: selectorparse.CommonSelector_ExploreAllRecursively,
103+
root: dirEnt.Root,
104+
lsys: dirLsys,
105+
expectedBytes: sizeOfDirEnt(dirEnt, dirLsys),
106+
expectedBlocks: blocksInDirEnt(dirEnt),
97107
validate: func(t *testing.T, r io.Reader) {
98108
root, blks := carToBlocks(t, r)
99109
require.Equal(t, dirEnt.Root, root)
100110
require.ElementsMatch(t, entCids(dirEnt), blkCids(blks))
101111
},
102112
},
103113
{
104-
name: "unixfs sharded directory",
105-
selector: selectorparse.CommonSelector_ExploreAllRecursively,
106-
root: shardedDirEnt.Root,
107-
lsys: shardedDirLsys,
114+
name: "unixfs sharded directory",
115+
selector: selectorparse.CommonSelector_ExploreAllRecursively,
116+
root: shardedDirEnt.Root,
117+
lsys: shardedDirLsys,
118+
expectedBytes: sizeOfDirEnt(shardedDirEnt, shardedDirLsys),
119+
expectedBlocks: blocksInDirEnt(shardedDirEnt),
108120
validate: func(t *testing.T, r io.Reader) {
109121
root, blks := carToBlocks(t, r)
110122
require.Equal(t, shardedDirEnt.Root, root)
@@ -118,8 +130,10 @@ func TestStreamCar(t *testing.T) {
118130
t.Run(tc.name, func(t *testing.T) {
119131
req := require.New(t)
120132
var buf bytes.Buffer
121-
err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.selector, &buf, false)
133+
byts, blks, err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.selector, &buf, false)
122134
req.NoError(err)
135+
req.Equal(tc.expectedBytes, byts)
136+
req.Equal(tc.expectedBlocks, blks)
123137
tc.validate(t, &buf)
124138
})
125139
}
@@ -200,3 +214,33 @@ func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry {
200214
}
201215
}
202216
}
217+
218+
func sizeOf(blks []blocks.Block) int64 {
219+
var size int64
220+
for _, blk := range blks {
221+
size += int64(len(blk.RawData()))
222+
}
223+
return size
224+
}
225+
func sizeOfDirEnt(dirEnt unixfs.DirEntry, ls linking.LinkSystem) int64 {
226+
var size int64
227+
for _, c := range dirEnt.SelfCids {
228+
blk, err := ls.LoadRaw(linking.LinkContext{}, cidlink.Link{Cid: c})
229+
if err != nil {
230+
panic(err)
231+
}
232+
size += int64(len(blk))
233+
}
234+
for _, c := range dirEnt.Children {
235+
size += sizeOfDirEnt(c, ls)
236+
}
237+
return size
238+
}
239+
240+
func blocksInDirEnt(dirEnt unixfs.DirEntry) int64 {
241+
size := int64(len(dirEnt.SelfCids))
242+
for _, c := range dirEnt.Children {
243+
size += blocksInDirEnt(c)
244+
}
245+
return size
246+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/ipfs/go-unixfsnode v1.7.3
1414
github.com/ipld/go-car/v2 v2.10.1
1515
github.com/ipld/go-codec-dagpb v1.6.0
16-
github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55
16+
github.com/ipld/go-ipld-prime v0.21.1-0.20230810111002-bdf990edcdeb
1717
github.com/ipni/go-libipni v0.3.4
1818
github.com/ipni/index-provider v0.13.5
1919
github.com/libp2p/go-libp2p v0.29.2

0 commit comments

Comments
 (0)